{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "52dc9b17-1182-44b3-bebf-ae2f508675d3",
   "metadata": {},
   "source": [
    "# Streaming Responses with Strands Agents in Amazon Bedrock AgentCore Runtime\n",
    "\n",
    "## Overview\n",
    "\n",
    "In this tutorial we will learn how to implement streaming responses using Amazon Bedrock AgentCore Runtime. This example demonstrates how to stream partial results as they become available, providing a more responsive user experience for operations that generate large amounts of content or take significant processing time.\n",
    "\n",
    "\n",
    "### Tutorial Details\n",
    "\n",
    "|Information| Details|\n",
    "|:--------------------|:---------------------------------------------------------------------------------|\n",
    "| Tutorial type       | Conversational with Streaming|\n",
    "| Agent type          | Single         |\n",
    "| Agentic Framework   | Strands Agents |\n",
    "| LLM model           | Anthropic Claude Sonnet 4 |\n",
    "| Tutorial components | Streaming responses with AgentCore Runtime, Strands Agent and Amazon Bedrock Model |\n",
    "| Tutorial vertical   | Cross-vertical                                                                   |\n",
    "| Example complexity  | Easy                                                                             |\n",
    "| SDK used            | Amazon BedrockAgentCore Python SDK and boto3|\n",
    "\n",
    "### Tutorial Architecture\n",
    "\n",
    "In this tutorial we will describe how to deploy a streaming agent to AgentCore runtime. \n",
    "\n",
    "For demonstration purposes, we will use a Strands Agent using Amazon Bedrock models with streaming capabilities.\n",
    "\n",
    "In our example we will use a simple agent with two tools: `get_weather` and `get_time`, but with streaming response capabilities.\n",
    "\n",
    "    \n",
    "<div style=\"text-align:left\">\n",
    "    <img src=\"images/architecture_runtime.png\" width=\"60%\"/>\n",
    "</div>\n",
    "\n",
    "### Tutorial Key Features\n",
    "\n",
    "* Streaming responses from agents on Amazon Bedrock AgentCore Runtime\n",
    "* Real-time partial result delivery\n",
    "* Using Amazon Bedrock models with streaming\n",
    "* Using Strands Agents with async streaming support"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3a676f58ecf52b42",
   "metadata": {},
   "source": [
    "## Prerequisites\n",
    "\n",
    "To execute this tutorial you will need:\n",
    "* Python 3.10+\n",
    "* AWS credentials\n",
    "* Amazon Bedrock AgentCore SDK\n",
    "* Strands Agents\n",
    "* Docker running"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "initial_id",
   "metadata": {
    "jupyter": {
     "is_executing": true
    }
   },
   "outputs": [],
   "source": [
    "#!uv add -r requirements.txt --active"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "932110e6-fca6-47b6-b7c5-c4714a866a80",
   "metadata": {},
   "source": [
    "## Preparing your streaming agent for deployment on AgentCore Runtime\n",
    "\n",
    "Let's now deploy our streaming agents to AgentCore Runtime. The streaming functionality is handled automatically by the AgentCore SDK when you use async generators or yield statements in your entrypoint function.\n",
    "\n",
    "Key points for streaming implementation:\n",
    "* Use `async def` for your entrypoint function\n",
    "* Use `yield` to stream chunks as they become available\n",
    "* The AgentCore SDK automatically handles the Server-Sent Events (SSE) format\n",
    "* Clients will receive Content-Type: text/event-stream responses\n",
    "\n",
    "### Strands Agents with Amazon Bedrock model and Streaming\n",
    "Let's look at our streaming implementation for the Strands Agent using Amazon Bedrock model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3b845b32-a03e-45c2-a2f0-2afba8069f47",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile strands_claude_streaming.py\n",
    "from strands import Agent, tool\n",
    "from strands_tools import calculator # Import the calculator tool\n",
    "import argparse\n",
    "import json\n",
    "from bedrock_agentcore.runtime import BedrockAgentCoreApp\n",
    "from strands.models import BedrockModel\n",
    "import asyncio\n",
    "from datetime import datetime\n",
    "\n",
    "app = BedrockAgentCoreApp()\n",
    "\n",
    "# Create a custom tool \n",
    "@tool\n",
    "def weather():\n",
    "    \"\"\" Get weather \"\"\" # Dummy implementation\n",
    "    return \"sunny\"\n",
    "\n",
    "@tool\n",
    "def get_time():\n",
    "    \"\"\" Get current time \"\"\"\n",
    "    return datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n",
    "\n",
    "model_id = \"us.anthropic.claude-sonnet-4-20250514-v1:0\"\n",
    "model = BedrockModel(\n",
    "    model_id=model_id,\n",
    ")\n",
    "agent = Agent(\n",
    "    model=model,\n",
    "    tools=[\n",
    "        calculator, weather, get_time\n",
    "    ],\n",
    "    system_prompt=\"\"\"You're a helpful assistant. You can do simple math calculations, \n",
    "    tell the weather, and provide the current time.\"\"\"\n",
    ")\n",
    "\n",
    "@app.entrypoint\n",
    "async def strands_agent_bedrock_streaming(payload):\n",
    "    \"\"\"\n",
    "    Invoke the agent with streaming capabilities\n",
    "    This function demonstrates how to implement streaming responses\n",
    "    with AgentCore Runtime using async generators\n",
    "    \"\"\"\n",
    "    user_input = payload.get(\"prompt\")\n",
    "    print(\"User input:\", user_input)\n",
    "    \n",
    "    try:\n",
    "        # Stream each chunk as it becomes available\n",
    "        async for event in agent.stream_async(user_input):\n",
    "            if \"data\" in event:\n",
    "                yield event[\"data\"]\n",
    "            \n",
    "    except Exception as e:\n",
    "        # Handle errors gracefully in streaming context\n",
    "        error_response = {\"error\": str(e), \"type\": \"stream_error\"}\n",
    "        print(f\"Streaming error: {error_response}\")\n",
    "        yield error_response\n",
    "\n",
    "if __name__ == \"__main__\":\n",
    "    app.run()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c64db7b5-0f1b-475f-9bf2-467b4449d46a",
   "metadata": {},
   "source": [
    "## Understanding Streaming in AgentCore Runtime\n",
    "\n",
    "When you use streaming with AgentCore Runtime, several things happen automatically:\n",
    "\n",
    "### Server-Sent Events (SSE) Format\n",
    "* The AgentCore SDK automatically converts your yielded data into SSE format\n",
    "* Each yield becomes a `data: ` event in the SSE stream\n",
    "* The Content-Type is automatically set to `text/event-stream`\n",
    "\n",
    "### Client Handling\n",
    "* Clients receive real-time updates as your agent processes the request\n",
    "* This enables progressive response display and better user experience\n",
    "* Clients can process partial results before the complete response is ready\n",
    "\n",
    "### Error Handling\n",
    "* Streaming responses should include proper error handling\n",
    "* Errors can be yielded as part of the stream\n",
    "* The stream ends when the function completes or encounters an unhandled exception"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6820ca8f-a8a8-4f34-b4ef-b6dad3776261",
   "metadata": {},
   "source": [
    "## Deploying the streaming agent to AgentCore Runtime\n",
    "\n",
    "The `CreateAgentRuntime` operation supports comprehensive configuration options, letting you specify container images, environment variables and encryption settings. You can also configure protocol settings (HTTP, MCP) and authorization mechanisms to control how your clients communicate with the agent. \n",
    "\n",
    "**Note:** Operations best practice is to package code as container and push to ECR using CI/CD pipelines and IaC\n",
    "\n",
    "In this tutorial we will use the Amazon Bedrock AgentCode Python SDK to easily package your artifacts and deploy them to AgentCore runtime."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d0861401-a111-4ade-9e02-50f52fdfa9b1",
   "metadata": {},
   "source": [
    "### Creating runtime role\n",
    "\n",
    "Before starting, let's create an IAM role for our AgentCore Runtime. We will do so using the utils function pre-developed for you."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "54dd2fdf-985c-4a70-8b87-071783a209de",
   "metadata": {},
   "outputs": [],
   "source": [
    "import sys\n",
    "import os\n",
    "\n",
    "# Get the current notebook's directory\n",
    "current_dir = os.path.dirname(os.path.abspath('__file__' if '__file__' in globals() else '.'))\n",
    "\n",
    "# Navigate up to the utils.py location\n",
    "utils_dir = os.path.join(current_dir, '..')\n",
    "utils_dir = os.path.join(utils_dir, '..')\n",
    "utils_dir = os.path.abspath(utils_dir)\n",
    "\n",
    "# Add to sys.path\n",
    "sys.path.insert(0, utils_dir)\n",
    "\n",
    "from utils import create_agentcore_role\n",
    "\n",
    "agent_name=\"strands_claude_streaming3\"\n",
    "agentcore_iam_role = create_agentcore_role(agent_name=agent_name)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8855aceb-b79f-4aaa-b16f-8577c059816a",
   "metadata": {},
   "source": [
    "### Configure AgentCore Runtime deployment\n",
    "\n",
    "Next we will use our starter toolkit to configure the AgentCore Runtime deployment with an entrypoint, the execution role we just created and a requirements file. We will also configure the starter kit to auto create the Amazon ECR repository on launch.\n",
    "\n",
    "During the configure step, your docker file will be generated based on your application code\n",
    "\n",
    "<div style=\"text-align:left\">\n",
    "    <img src=\"images/configure.png\" width=\"60%\"/>\n",
    "</div>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2e79eba2-ca59-463f-9ebf-56e362d7ae66",
   "metadata": {},
   "outputs": [],
   "source": [
    "from bedrock_agentcore_starter_toolkit import Runtime\n",
    "from boto3.session import Session\n",
    "boto_session = Session()\n",
    "region = boto_session.region_name\n",
    "region\n",
    "\n",
    "agentcore_runtime = Runtime()\n",
    "\n",
    "response = agentcore_runtime.configure(\n",
    "    entrypoint=\"strands_claude_streaming.py\",\n",
    "    execution_role=agentcore_iam_role['Role']['Arn'],\n",
    "    auto_create_ecr=True,\n",
    "    requirements_file=\"requirements.txt\",\n",
    "    region=region,\n",
    "    agent_name=agent_name\n",
    ")\n",
    "response"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9e1b84cc-798e-472c-ac0b-2c315f4b704d",
   "metadata": {},
   "source": [
    "### Launching streaming agent to AgentCore Runtime\n",
    "\n",
    "Now that we've got a docker file, let's launch the streaming agent to the AgentCore Runtime. This will create the Amazon ECR repository and the AgentCore Runtime\n",
    "\n",
    "<div style=\"text-align:left\">\n",
    "    <img src=\"images/launch.png\" width=\"85%\"/>\n",
    "</div>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "17a32ab8-7701-4900-8055-e24364bdf35c",
   "metadata": {},
   "outputs": [],
   "source": [
    "launch_result = agentcore_runtime.launch()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a0ae9c09-09db-4a76-871a-92eacd96b9c3",
   "metadata": {},
   "source": [
    "### Checking for the AgentCore Runtime Status\n",
    "Now that we've deployed the AgentCore Runtime, let's check for it's deployment status"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "afa6ac09-9adb-4846-9fc1-4d12aeb74853",
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "\n",
    "status_response = agentcore_runtime.status()\n",
    "status = status_response.endpoint['status']\n",
    "end_status = ['READY', 'CREATE_FAILED', 'DELETE_FAILED', 'UPDATE_FAILED']\n",
    "while status not in end_status:\n",
    "    time.sleep(10)\n",
    "    status_response = agentcore_runtime.status()\n",
    "    status = status_response.endpoint['status']\n",
    "    print(status)\n",
    "status"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b7f89c56-918a-4cab-beaa-c7ac43a2ba29",
   "metadata": {},
   "source": [
    "### Invoking AgentCore Runtime with Streaming\n",
    "\n",
    "Finally, we can invoke our AgentCore Runtime with a payload and receive streaming responses\n",
    "\n",
    "<div style=\"text-align:left\">\n",
    "    <img src=\"images/invoke.png\" width=\"85%\"/>\n",
    "</div>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3d909e42-e1a0-407f-84c2-3d16cc889cd3",
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "invoke_response = agentcore_runtime.invoke({\n",
    "    \"prompt\": \n",
    "    \"what the weather is like?\"\n",
    "})\n",
    "invoke_response"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2c1d2bce-be41-478c-8bed-b4037c385795",
   "metadata": {},
   "source": [
    "### Invoking AgentCore Runtime with boto3 for Streaming\n",
    "\n",
    "Now that your AgentCore Runtime was created you can invoke it with any AWS SDK. For streaming responses, you'll need to handle the Server-Sent Events format."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7f84e68d-6c04-41b9-bf5b-60edc3fa0985",
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "import boto3\n",
    "import json\n",
    "from IPython.display import Markdown, display\n",
    "\n",
    "agent_arn = launch_result.agent_arn\n",
    "agentcore_client = boto3.client(\n",
    "    'bedrock-agentcore',\n",
    "    region_name=region\n",
    ")\n",
    "\n",
    "# For streaming responses, we need to handle the EventStream\n",
    "boto3_response = agentcore_client.invoke_agent_runtime(\n",
    "    agentRuntimeArn=agent_arn,\n",
    "    qualifier=\"DEFAULT\",\n",
    "    payload=json.dumps({\"prompt\": \"How much is 2+1\"})\n",
    ")\n",
    "\n",
    "# Check if the response is streaming\n",
    "if \"text/event-stream\" in boto3_response.get(\"contentType\", \"\"):\n",
    "    print(\"Processing streaming response with boto3:\")\n",
    "    content = []\n",
    "    for line in boto3_response[\"response\"].iter_lines(chunk_size=1):\n",
    "        if line:\n",
    "            line = line.decode(\"utf-8\")\n",
    "            if line.startswith(\"data: \"):\n",
    "                data = line[6:].replace('\"', '')  # Remove \"data: \" prefix\n",
    "                print(f\"Received streaming chunk: {data}\")\n",
    "                content.append(data.replace('\"', ''))\n",
    "    \n",
    "    # Display the complete streamed response\n",
    "    full_response = \" \".join(content)\n",
    "    display(Markdown(full_response))\n",
    "else:\n",
    "    # Handle non-streaming response\n",
    "    try:\n",
    "        events = []\n",
    "        for event in boto3_response.get(\"response\", []):\n",
    "            events.append(event)\n",
    "    except Exception as e:\n",
    "        events = [f\"Error reading EventStream: {e}\"]\n",
    "    \n",
    "    if events:\n",
    "        try:\n",
    "            response_data = json.loads(events[0].decode(\"utf-8\"))\n",
    "            display(Markdown(response_data))\n",
    "        except:\n",
    "            print(f\"Raw response: {events[0]}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "streaming_benefits",
   "metadata": {},
   "source": [
    "## Benefits of Streaming Responses\n",
    "\n",
    "Streaming responses provide several key advantages:\n",
    "\n",
    "### User Experience\n",
    "* **Immediate Feedback**: Users see partial results as they become available\n",
    "* **Perceived Performance**: Responses feel faster even if total time is the same\n",
    "* **Progressive Display**: Long responses can be displayed incrementally\n",
    "\n",
    "### Technical Benefits\n",
    "* **Memory Efficient**: Process large responses without loading everything into memory\n",
    "* **Timeout Prevention**: Avoid timeouts on long-running operations\n",
    "* **Real-time Processing**: Handle real-time data as it becomes available\n",
    "\n",
    "### Use Cases\n",
    "* **Content Generation**: Long-form writing, reports, documentation\n",
    "* **Data Analysis**: Progressive results from complex calculations\n",
    "* **Multi-step Workflows**: Show progress through complex agent reasoning\n",
    "* **Real-time Monitoring**: Live updates from monitoring agents"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7d3fdfe404469632",
   "metadata": {},
   "source": [
    "## Cleanup (Optional)\n",
    "\n",
    "Let's now clean up the AgentCore Runtime created"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7c243e86-a214-483c-aef1-d5243f28ca9e",
   "metadata": {},
   "outputs": [],
   "source": [
    "launch_result.ecr_uri, launch_result.agent_id, launch_result.ecr_uri.split('/')[1]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "76a6cf1416830a54",
   "metadata": {},
   "outputs": [],
   "source": [
    "agentcore_control_client = boto3.client(\n",
    "    'bedrock-agentcore-control',\n",
    "    region_name=region\n",
    ")\n",
    "ecr_client = boto3.client(\n",
    "    'ecr',\n",
    "    region_name=region\n",
    "    \n",
    ")\n",
    "\n",
    "iam_client = boto3.client('iam')\n",
    "\n",
    "runtime_delete_response = agentcore_control_client.delete_agent_runtime(\n",
    "    agentRuntimeId=launch_result.agent_id,\n",
    "    \n",
    ")\n",
    "\n",
    "response = ecr_client.delete_repository(\n",
    "    repositoryName=launch_result.ecr_uri.split('/')[1],\n",
    "    force=True\n",
    ")\n",
    "\n",
    "policies = iam_client.list_role_policies(\n",
    "    RoleName=agentcore_iam_role['Role']['RoleName'],\n",
    "    MaxItems=100\n",
    ")\n",
    "\n",
    "for policy_name in policies['PolicyNames']:\n",
    "    iam_client.delete_role_policy(\n",
    "        RoleName=agentcore_iam_role['Role']['RoleName'],\n",
    "        PolicyName=policy_name\n",
    "    )\n",
    "iam_response = iam_client.delete_role(\n",
    "    RoleName=agentcore_iam_role['Role']['RoleName']\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b118ad38-feeb-4d1d-9d57-e5c845becc56",
   "metadata": {},
   "source": [
    "# Congratulations!\n",
    "\n",
    "You have successfully implemented and deployed a streaming agent using Amazon Bedrock AgentCore Runtime! \n",
    "\n",
    "## What you've learned:\n",
    "* How to implement streaming responses using async generators\n",
    "* How AgentCore Runtime automatically handles SSE format\n",
    "* How to process streaming responses on the client side\n",
    "* The benefits of streaming for user experience and performance\n",
    "\n",
    "## Next steps:\n",
    "* Experiment with different streaming patterns for your use cases\n",
    "* Implement custom streaming logic for complex multi-step workflows\n",
    "* Explore combining streaming with other AgentCore features like Memory and Gateway\n",
    "* Consider implementing client-side streaming visualization for better UX"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.18"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
